package nq

import (
	"io/ioutil"
	"log"
	"time"

	"github.com/nsqio/go-nsq"
	"github.com/tal-tech/go-zero/core/logx"
	"github.com/tal-tech/go-zero/core/proc"
	"github.com/tal-tech/go-zero/core/queue"
	"github.com/tal-tech/go-zero/core/service"
	"github.com/tal-tech/go-zero/core/stat"
	"github.com/tal-tech/go-zero/core/threading"
	"github.com/tal-tech/go-zero/core/timex"
	"github.com/thinkeridea/go-extend/exbytes"
)

type (
	ConsumeHandle func(key, value string) error

	ConsumeHandler interface {
		Consume(key, value string) error
	}

	queueOptions struct {
		metrics *stat.Metrics
	}

	QueueOption func(*queueOptions)

	nsqQueue struct {
		c                NqConf
		consumer         *nsq.Consumer
		handler          ConsumeHandler
		channel          chan *nsq.Message
		producerRoutines *threading.RoutineGroup
		consumerRoutines *threading.RoutineGroup
		metrics          *stat.Metrics
	}

	nsqQueues struct {
		queues []queue.MessageQueue
		group  *service.ServiceGroup
	}
)

func MustNewQueue(c NqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue {
	q, err := NewQueue(c, handler, opts...)
	if err != nil {
		log.Fatal(err)
	}

	return q
}

func NewQueue(c NqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) {
	var options queueOptions
	for _, opt := range opts {
		opt(&options)
	}
	ensureQueueOptions(c, &options)

	if c.Conns < 1 {
		c.Conns = 1
	}
	q := nsqQueues{
		group: service.NewServiceGroup(),
	}
	for i := 0; i < c.Conns; i++ {
		q.queues = append(q.queues, newNsqQueue(c, handler, options))
	}

	return q, nil
}

func newNsqQueue(c NqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
	// var offset int64
	// if c.Offset == firstOffset {
	// offset = kafka.FirstOffset
	// } else {
	// offset = kafka.LastOffset
	// }
	config := nsq.NewConfig()
	config.DefaultRequeueDelay = 0
	config.MaxBackoffDuration = time.Millisecond * 50
	config.DialTimeout = 5 * time.Second
	config.LookupdPollInterval = 5 * time.Second
	if len(c.Channel) == 0 {
		c.Channel = "default"
	}
	consumer, err := nsq.NewConsumer(c.Topic, c.Channel, config)
	if err != nil {
		return nil
	}
	consumer.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags), nsq.LogLevelError)
	que := &nsqQueue{
		c:                c,
		consumer:         consumer,
		handler:          handler,
		channel:          make(chan *nsq.Message),
		producerRoutines: threading.NewRoutineGroup(),
		consumerRoutines: threading.NewRoutineGroup(),
		metrics:          options.metrics,
	}
	consumer.AddHandler(que)
	return que
}

func (q *nsqQueue) Start() {
	q.startConsumers()
	q.startProducers()

	q.producerRoutines.Wait()
	// close(q.channel)
	q.consumerRoutines.Wait()
}

func (q *nsqQueue) Stop() {
	if q.consumer != nil {
		q.consumer.Stop()
	}
	logx.Close()
}

func (q *nsqQueue) consumeOne(key, val string) error {
	startTime := timex.Now()
	err := q.handler.Consume(key, val)
	q.metrics.Add(stat.Task{
		Duration: timex.Since(startTime),
	})
	return err
}

func (q *nsqQueue) startConsumers() {
	for i := 0; i < q.c.Processors; i++ {
		q.consumerRoutines.Run(func() {
			for msg := range q.channel {
				key := q.c.Topic
				if err := q.consumeOne(key, exbytes.ToString(msg.Body)); err != nil {
					logx.Errorf("Error on consuming: %s:%s, error: %v", key, exbytes.ToString(msg.Body), err)
				}
			}
		})
	}
}

func (q *nsqQueue) startProducers() {
	for i := 0; i < q.c.Consumers; i++ {
		q.producerRoutines.Run(func() {
			for {
				err := q.consumer.ConnectToNSQLookupds(q.c.Brokers)
				if err != nil {
					logx.Errorf("Error on reading message, %q", err.Error())
					continue
				}
				break
			}
		})
	}
}

//new add
func (q *nsqQueue) HandleMessage(message *nsq.Message) error {
	defer func() {
		recover()
	}()
	q.channel <- message
	return nil
}

func (q nsqQueues) Start() {
	for _, each := range q.queues {
		q.group.Add(each)
	}
	q.group.Start()
}

func (q nsqQueues) Stop() {
	q.group.Stop()
}

func WithHandle(handle ConsumeHandle) ConsumeHandler {
	return innerConsumeHandler{
		handle: handle,
	}
}

func WithMetrics(metrics *stat.Metrics) QueueOption {
	return func(options *queueOptions) {
		options.metrics = metrics
	}
}

type innerConsumeHandler struct {
	handle ConsumeHandle
}

func (ch innerConsumeHandler) Consume(k, v string) error {
	return ch.handle(k, v)
}

func ensureQueueOptions(c NqConf, options *queueOptions) {
	if options.metrics == nil {
		options.metrics = stat.NewMetrics(proc.ProcessName())
	}
}
